package com.iflytek.jzcpx.procuracy.ocr.component.mq;

import java.util.function.Consumer;

import com.fasterxml.jackson.core.type.TypeReference;
import com.iflytek.jzcpx.procuracy.common.util.JSONUtil;
import com.iflytek.jzcpx.procuracy.ocr.service.OcrService;
import com.yomahub.tlog.core.mq.TLogMqConsumerProcessor;
import com.yomahub.tlog.core.mq.TLogMqRunner;
import com.yomahub.tlog.core.mq.TLogMqWrapBean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;

/**
 * 应用启动后, 每个队列启用一个读取线程, 阻塞式地队列中获取消息, 获取到消息之后,
 * 再将消息提交对应的消息处理线程池, 各线程池再调用不同的业务接口.
 * 如果队列中没有消息, 则消费消费线程等待 1 秒后, 再次重新读取.
 *
 * 1. 之所以有一个读取线程池(每个队列有一个读取线程), 是因为读取消息是阻塞式的, 期间会一直占用 redis 连接,
 * 因此不能直接使用各队列的消息处理线程池去获取消息, 那样会造成其他线程无连接可用.
 *
 * 2. 各消息处理线程使用的是SynchronousQueue, 是因为从队列读取到消息后,
 * 如果没有可用的处理线程, 而此时系统万一关机会导致消息丢失
 *
 * @author <a href=mailto:ktyi@iflytek.com>伊开堂</a>
 * @date 2019-08-14 18:41
 */
@Component
public class MessageConsumer {
    private static final Logger logger = LoggerFactory.getLogger(MessageConsumer.class);

    @Autowired
    private OcrService ocrService;

    @JmsListener(destination = "${mq.queue.ocrTask}", concurrency = "${mq.queue.ocrTask.concurrency}")
    public void readOcrTaskQueue(String message) {
        logger.info("收到[ OCR识别 ]消息, message: {}", message);

        long start = System.currentTimeMillis();
        try {
            process(message, ocrService::processOcrTask);
        }
        catch (Exception e) {
            logger.warn("[ OCR识别 ]消息处理异常, message: {}", message, e);
        }
        finally {
            logger.info("[ OCR识别 ]消息处理结束, message: {}, 耗时: {}ms", message, System.currentTimeMillis() - start);
        }
    }
    @JmsListener(destination = "${mq.queue.ocrResult}", concurrency = "${mq.queue.ocrResult.concurrency}")
    public void readOcrResultPushQueue(String message) {
        logger.info("收到[ 推送OCR识别结果 ]消息, message: {}", message);

        long start = System.currentTimeMillis();
        try {
            process(message, ocrService::pushOcrResult);
        }
        catch (Exception e) {
            logger.warn("[ 推送OCR识别结果 ]消息处理异常, message: {}", message, e);
        }
        finally {
            logger.info("[ 推送OCR识别结果 ]消息处理结束, message: {}, 耗时: {}ms", message, System.currentTimeMillis() - start);
        }
    }

    @JmsListener(destination = "${mq.queue.recognizeTask}", concurrency = "${mq.queue.recognizeTask.concurrency}")
    public void readRecognizeTaskQueue(String message) {
        logger.info("收到[ 图像识别 ]消息, message: {}", message);

        long start = System.currentTimeMillis();
        try {
            process(message, ocrService::processRecognizeTask);
        }
        catch (Exception e) {
            logger.warn("[ 图像识别 ]消息处理异常, message: {}", message, e);
        }
        finally {
            logger.info("[ 图像识别 ]消息处理结束, message: {}, 耗时: {}ms", message, System.currentTimeMillis() - start);
        }
    }

    @JmsListener(destination = "${mq.queue.recognizeResult}", concurrency = "${mq.queue.recognizeResult.concurrency}")
    public void readRecognizeResultPushQueue(String message) {
        logger.info("收到[ 推送图像识别结果 ]消息, message: {}", message);

        long start = System.currentTimeMillis();
        try {
            process(message, ocrService::pushRecognizeResult);
        }
        catch (Exception e) {
            logger.warn("[ 推送图像识别结果 ]消息处理异常, message: {}", message, e);
        }
        finally {
            logger.info("[ 推送图像识别结果 ]消息处理结束, message: {}, 耗时: {}ms", message, System.currentTimeMillis() - start);
        }
    }
    private static void process(String message, Consumer<Long> consumer) {
        TLogMqWrapBean<Long> wrapBean = null;
        try {
            wrapBean = JSONUtil.toBean(message, new TypeReference<TLogMqWrapBean<Long>>() {});
        }
        catch (Exception e) {
            logger.warn("消息反序列化错误, 可能是消息体格式错误, 任务异常结束, 内容: {}", message, e);
            return;
        }
        TLogMqConsumerProcessor.process(wrapBean, new TLogMqRunner<Long>(){
            @Override
            public void mqConsume(Long id) {
                logger.info("处理业务消息, id: {}", id);
                try {
                    consumer.accept(id);
                }
                catch (Exception e) {
                    logger.info("处理业务消息发生异常, id: {}", id, e);
                }
            }
        });
    }
}


